#==============================================================================#
# 5-relink-profiles.r
# Matt Curtis 27/06/22
# mjdcurtis@gmail.com
#------------------------------------------------------------------------------#
# pull all records from the huge file that link to the tree constructed before
# updated feb 9 2023
#==============================================================================#
#library(tidyverse)
#library(data.table)
library(arrow)
library(tidytable)
library(stringr)
#------------------------------------------------------------------------------#
rm(list = ls())
gc() 

# setwd once
setwd("~/Replication package")

# reload the raw geni files or just assemble the final output
reload<-0
#------------------------------------------------------------------------------#

families<-read_parquet("./2_scripts/2_0_tempfiles/fr-family-ids.parquet",
                       as_data_frame=FALSE) 

#------------------------------------------------------------------------------#
# split the data in to chunks
chunk_n<-0
max_chunk<-15

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  f<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_profiles/part-",chunk_n,'.parquet')
  geni<-read_parquet(f,as_data_frame=FALSE) %>% 
    dplyr::select(-c(7, # display name
              15,16,17,18, # birth end
              29,30,31,32, # baptism end
              43,44,45,46, # death end
              58,59,60,61, # burial end 
              67,68,69,70,71 # last residence
    ))
    
  
  geni_names<-names(geni)
  
  print(paste0("Joining: ",chunk_n+1,"/",max_chunk+1))
  # keep only the profile_ids we want
  geni<-dplyr::inner_join(families,geni) %>% 
    dplyr::collect() %>% 
    tidytable()
  gc()
  
  print(paste0("Donating from baptisms and burials: ",chunk_n+1,"/",max_chunk+1))
  
  
  
  # fix variable type issues
  geni<-geni %>% 
    mutate(across(ends_with("country"),
                  ~as.character(.))) %>% 
    mutate(across(ends_with("state"),# paste together all the locations
                  ~as.character(.))) %>% 
    mutate(across(ends_with("county"),
                  ~as.character(.))) %>% 
    mutate(across(ends_with("city"),
                  ~as.character(.))) %>% 
    mutate(across(ends_with("place_name"),
                  ~as.character(.))) %>% 
    mutate(across(ends_with("year"),
                  ~as.numeric(.))) %>% 
    mutate(across(ends_with("month"),
                  ~as.numeric(.))) %>% 
    mutate(across(ends_with("day"),
                  ~as.numeric(.))) %>% 
    mutate(across(ends_with("range"),
                  ~as.character(.))) %>% 
    mutate(across(ends_with("circa"),
                  ~as.character(.))) 
  
  
  geni <- geni %>%
    mutate(
      birth_loc = paste(
        birth_city,
        birth_county,
        birth_state,
        birth_country,
        birth_place_name,
        sep = ", "
      ) %>%
        str_remove_all(", , ") %>%
        str_remove_all("^, ")  %>%
        str_remove_all(", $")
    ) %>% 
    mutate(
      baptism_loc = paste(
        baptism_city,
        baptism_county,
        baptism_state,
        baptism_country,
        baptism_place_name,
        sep = ", "
      ) %>%
        str_remove_all(", , ") %>%
        str_remove_all("^, ")  %>%
        str_remove_all(", $")
    ) %>% 
    mutate(
      death_loc = paste(
        death_city,
        death_county,
        death_state,
        death_country,
        death_place_name,
        sep = ", "
      ) %>%
        str_remove_all(", , ") %>%
        str_remove_all("^, ")  %>%
        str_remove_all(", $")
    ) %>% 
    mutate(
      burial_loc = paste(
        burial_city,
        burial_county,
        burial_state,
        burial_country,
        burial_place_name,
        sep = ", "
      ) %>%
        str_remove_all(", , ") %>%
        str_remove_all("^, ")  %>%
        str_remove_all(", $")
    )
  
  
  # baptism replace missing births
  geni <- geni %>%
    mutate(bapl_flag = birth_loc == "" & baptism_loc != "") %>%
    
    # donate country
    mutate(birth_country = case_when(
      bapl_flag ~ baptism_country,
      TRUE ~ birth_country
    )) %>%
    
    # donate state
    mutate(birth_state = case_when(
      bapl_flag ~ baptism_state,
      TRUE ~ birth_state
    )) %>%
    
    # donate county
    mutate(birth_county = case_when(
      bapl_flag ~ baptism_county,
      TRUE ~ birth_county
    )) %>%
    
    # donate city
    mutate(birth_city = case_when(
      bapl_flag ~ baptism_city,
      TRUE ~ birth_city
    )) %>%
    
    # donate place
    mutate(birth_place_name = case_when(
      bapl_flag ~ baptism_place_name,
      TRUE ~ birth_place_name
    )) %>%
    
    # donate time
    mutate(bapd_flag = is.na(birth_start_year) &
             !is.na(baptism_start_year)) %>%
    
    # year
    mutate(birth_start_year = case_when(
      bapd_flag ~ baptism_start_year,
      TRUE ~ birth_start_year
    )) %>% 
    
    # month
    mutate(birth_start_month = case_when(
      bapd_flag ~ baptism_start_month,
      TRUE ~ birth_start_month
    )) %>% 
    
    # day
    mutate(birth_start_day = case_when(
      bapd_flag ~ baptism_start_day,
      TRUE ~ birth_start_day
    )) %>% 
    
    # circa
    mutate(birth_start_circa = case_when(
      bapd_flag ~ baptism_start_circa,
      TRUE ~ birth_start_circa
    )) %>% 
    
    # range
    mutate(birth_range = case_when(
      bapd_flag ~ baptism_range,
      TRUE ~ birth_range
    ))
  
  # burial replace missing deaths
  geni <- geni %>%
    mutate(bapl_flag = death_loc == "" & burial_loc != "") %>%
    
    # donate country
    mutate(death_country = case_when(
      bapl_flag ~ burial_country,
      TRUE ~ death_country
    )) %>%
    
    # donate state
    mutate(death_state = case_when(
      bapl_flag ~ burial_state,
      TRUE ~ death_state
    )) %>%
    
    # donate county
    mutate(death_county = case_when(
      bapl_flag ~ burial_county,
      TRUE ~ death_county
    )) %>%
    
    # donate city
    mutate(death_city = case_when(
      bapl_flag ~ burial_city,
      TRUE ~ death_city
    )) %>%
    
    # donate place
    mutate(death_place_name = case_when(
      bapl_flag ~ burial_place_name,
      TRUE ~ death_place_name
    )) %>%
    
    # donate time
    mutate(bapd_flag = is.na(death_start_year) &
             !is.na(burial_start_year)) %>%
    
    # year
    mutate(death_start_year = case_when(
      bapd_flag ~ burial_start_year,
      TRUE ~ death_start_year
    )) %>% 
    
    # month
    mutate(death_start_month = case_when(
      bapd_flag ~ burial_start_month,
      TRUE ~ death_start_month
    )) %>% 
    
    # day
    mutate(death_start_day = case_when(
      bapd_flag ~ burial_start_day,
      TRUE ~ death_start_day
    )) %>% 
    
    # circa
    mutate(death_start_circa = case_when(
      bapd_flag ~ burial_start_circa,
      TRUE ~ death_start_circa
    )) %>% 
    
    # range
    mutate(death_range = case_when(
      bapd_flag ~ burial_range,
      TRUE ~ death_range
    ))
  
  # drop burial and birth variables
  geni <- select(
    geni,-matches(coll("baptism")),
    -matches(coll("burial"))
  )
  
  #----------------------------------------------------------------------------#
  print(paste0("Saving: ",chunk_n+1,"/",max_chunk+1))
  
  write_parquet(geni, paste0("./2_scripts/2_0_tempfiles/fr-big-chunk-",chunk_n,"_",max_chunk,".parquet"))
  gc()
  
  # prepare for next round
  chunk_n<-chunk_n+1
}
#------------------------------------------------------------------------------#
rm(list=ls())
gc()



files<- list.files(pattern = "fr-big-chunk.*parquet",path="./2_scripts/2_0_tempfiles")
first<-TRUE
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file),
                       as_data_frame=FALSE) 
  if(first){
    output<-temp
    first<-FALSE
  }else{
    output<-rbind(output,temp)
    rm(temp)
  }
  #file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
}



# born_france
pids<-read_parquet("./2_scripts/2_0_tempfiles/fr-pids-bd.parquet",
                   as_data_frame=FALSE) #%>% 
  
  # no longer needed as no dups
  #distinct() %>% 
  #mutate(n = n(),.by=profile_id) %>% 
  #lke(n == 1) %>% 
  #select(-n)

before<-nrow(output)
output<-dplyr::left_join(output,pids) %>% 
  dplyr::compute()
stopifnot(nrow(output)==before)

write_parquet(output,'./2_scripts/2_0_tempfiles/fr-raw.parquet')
rm(output)
gc()
